package com.mjf.app.ods;

import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.mjf.app.function.CustomerDeserialization;
import com.mjf.utils.MyKafkaUtil;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * 将 Mysql(gmall-flink 库) 中最新业务数据同步到 Kafka（ods_base_db）
 *
 */
public class FlinkCDC {
    public static void main(String[] args) throws Exception {

        // 获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

/*      // 测试时关闭
        // 设置状态后端
        env.setStateBackend(new FsStateBackend("hdfs://hadoop102:9000/gmall-flink/checkpoint"));
        env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointTimeout(10000L);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);

        // 设置重启策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10));
*/


        // 通过 FlinkCDC 构建 SourceFunction 并读取数据
        DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder()
                .hostname("hadoop103")
                .port(3306)
                .username("root")
                .password("123456")
                .databaseList("gmall-flink")
                .deserializer(new CustomerDeserialization())
                .startupOptions(StartupOptions.latest())
                .build();

        DataStreamSource<String> streamSource = env.addSource(sourceFunction);

        // 打印数据，并将数据写入 Kafka
        streamSource.print();
        String sinkTopic = "ods_base_db";
        streamSource.addSink(MyKafkaUtil.getKafkaProducer(sinkTopic));

        env.execute(FlinkCDC.class.getName());

    }
}
